package cn.jdemo.juc.core.util.async.wrapper;

import cn.jdemo.juc.core.util.async.callback.ICallback;
import cn.jdemo.juc.core.util.async.callback.IWorker;
import cn.jdemo.juc.core.util.async.worker.ResultStateEnum;
import cn.jdemo.juc.core.util.async.worker.WorkResult;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

/**
 *
 * @description
 * @date 2020/11/30
 */
public class WorkerWrapper<P,V> {

    /**
     * 该wrapper的唯一标识
     */
    private String id;

    private IWorker<P,V> worker;
    private P param;
    private ICallback<P,V> callback;

    private List<WorkerWrapper<?,?>> nextWrappers;

    private List<WorkerWrapper<?,?>> dependWrappers;

    /**
     * 当前completestage们的集合
     */
    private Map<String, WorkerWrapper> forParamUseWrappers;

    // ??? 也是个钩子变量，用来存临时的结果
    private volatile WorkResult<V> workResult = WorkResult.defaultResult();

    private AtomicInteger state = new AtomicInteger(INIT);

    private static final int INIT = 0;
    private static final int FINISH = 1;
    private static final int ERROR = 2;
    private static final int WORKING = 3;

    private WorkerWrapper(){
    }
    public WorkerWrapper(String id, IWorker<P, V> worker, P param, ICallback<P, V> callback, List<WorkerWrapper<?,?>> nextWrappers) {
        this.id = id;
        this.param = param;
        this.worker=worker;
        this.callback = callback;
        this.nextWrappers = nextWrappers;
    }

    // 异步执行
    public void work(ExecutorService es, long timeout, Map<String, WorkerWrapper> forParamUseWrappers) {
        // workerWrapper.
        work(es, null, timeout, forParamUseWrappers);
    }

    private void work(ExecutorService poolExecutor, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {

        this.forParamUseWrappers = forParamUseWrappers;
        //将自己放到所有wrapper的集合里去
        forParamUseWrappers.put(id, this);

        long now = System.currentTimeMillis();
        //总的已经超时了，就快速失败，进行下一个
        if (remainTime <= 0) {
            fastFail(INIT, null);
            beginNext(poolExecutor, now, remainTime);
            return;
        }

        //如果自己已经执行过了。
        //可能有多个依赖，其中的一个依赖已经执行完了，并且自己也已开始执行或执行完毕。当另一个依赖执行完毕，又进来该方法时，就不重复处理了
        if (getState() == FINISH || getState() == ERROR) {
            beginNext(poolExecutor, now, remainTime);
            return;
        }

        if (dependWrappers == null || dependWrappers.size()==0){
            // 执行自身业务
            fire();
            // 执行下次业务
            beginNext(poolExecutor, now, remainTime);
            return;
        }

         /*如果有前方依赖，存在两种情况
         一种是前面只有一个wrapper。即 A  ->  B
        一种是前面有多个wrapper。A C D ->   B。需要A、C、D都完成了才能轮到B。但是无论是A执行完，还是C执行完，都会去唤醒B。
        所以需要B来做判断，必须A、C、D都完成，自己才能执行 */
        //只有一个依赖
        if (dependWrappers.size() == 1) {
            // doDependsOneJob(fromWrapper);
            beginNext(poolExecutor, now, remainTime);
        } else {
            //有多个依赖时
            // doDependsJobs(poolExecutor, dependWrappers, fromWrapper, now, remainTime);
        }
    }

    private void beginNext(ExecutorService poolExecutor, long now, long remainTime) {
        //花费的时间
        long costTime = System.currentTimeMillis() - now;
        if (nextWrappers==null || nextWrappers.size()==0){
            return;
        }
        if (nextWrappers.size() == 1){
            nextWrappers.get(0).work(poolExecutor, WorkerWrapper.this, remainTime-costTime,forParamUseWrappers);
        }

        CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];
        for (int i=0;i<nextWrappers.size();i++){
            int finalI = i;
            futures[finalI] = CompletableFuture.runAsync(() -> {
                nextWrappers.get(finalI).work(poolExecutor, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers);
            }, poolExecutor);
        }
        try {
            CompletableFuture.allOf(futures).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }

    private void fire() {
        //阻塞取结果
        workResult = workerDoJob();
    }

    public List<WorkerWrapper<?, ?>> getNextWrappers() {
        return nextWrappers;
    }

    private WorkResult<V> workerDoJob() {
        // 开始执行???
        callback.begin();

        //如果已经不是init状态了，说明正在被执行或已执行完毕。这一步很重要，可以保证任务不被重复执行
        if (!compareAndSetState(INIT, WORKING)) {
            return workResult;
        }

        V resultValue = worker.action(param,forParamUseWrappers);

        // 如果状态不是在working,说明别的地方已经修改了
        if (!compareAndSetState(WORKING, FINISH)){
            return workResult;
        }

        workResult.setResult(resultValue);
        workResult.setResultState(ResultStateEnum.SUCCESS);
        callback.result(true, param, workResult);

        return workResult;
    }

    /**
     * 总控制台超时，停止任务
     */
    public void stopNow() {
        if (getState() == INIT || getState() == WORKING) {
            fastFail(getState(), null);
        }
    }

    // 快速失败
    private boolean fastFail(int expect, Exception ex) {
        //试图将它从expect状态,改成Error
        if (!compareAndSetState(expect, ERROR)) {
            return false;
        }
        //尚未处理过结果
        if (checkIsNullResult()) {
            if (ex == null) {
                workResult = defaultResult();
            } else {
                workResult = defaultExResult(ex);
            }
        }
        callback.result(false, param, workResult);
        return true;
    }

    private boolean checkIsNullResult() {
        return ResultStateEnum.DEFAULT == workResult.getResultState();
    }

    private WorkResult<V> defaultResult() {
        workResult.setResultState(ResultStateEnum.TIMEOUT);
        workResult.setResult(worker.defaultValue());
        return workResult;
    }
    private WorkResult<V> defaultExResult(Exception ex) {
        workResult.setResultState(ResultStateEnum.EXCEPTION);
        workResult.setResult(worker.defaultValue());
        workResult.setEx(ex);
        return workResult;
    }

    private boolean compareAndSetState(int expect, int error) {
        return state.compareAndSet(expect, error);
    }

    private int getState() {
        return state.get();
    }

    public static class Builder<W,C>{
        private String id = UUID.randomUUID().toString();
        private ICallback<W,C> callback;
        private IWorker<W,C> worker;
        private W param;
        private List<WorkerWrapper<?,?>> nextWrappers;

        public Builder<W,C> worker(IWorker<W, C> worker){
            this.worker = worker;
            return this;
        }
        public Builder<W,C> callback(ICallback<W, C> callback){
            this.callback = callback;
            return this;
        }
        public Builder<W,C> param(W param){
            this.param = param;
            return this;
        }
        public Builder<W,C> id(String id){
            if (id != null &&  !"".equals(id)){
                this.id = id;
            }
            return this;
        }

        public Builder<W, C> next(WorkerWrapper<?,?> wrapper) {
            if (nextWrappers == null){
                nextWrappers = new LinkedList();
            }
            nextWrappers.add(wrapper);
            return this;
        }
        public Builder<W, C> next(WorkerWrapper<?,?>... wrappers) {
            if (wrappers==null){
                return this;
            }
            for (WorkerWrapper wrapper:wrappers) {
                next(wrapper);
            }
            return this;
        }

        // 构建WorkerWrapper对象
        public WorkerWrapper<W, C> build() {
            WorkerWrapper wrapper = new WorkerWrapper<W, C>(id,worker,param,callback,nextWrappers);
            return wrapper;
        }
    }
}
